fix: codis slot migrate#3233
Conversation
📝 WalkthroughWalkthroughChanges to pika_migrate_thread.cc and pika_slot_command.cc. First file: return value modified from 1 to 0 for successful key migration enqueueing. Second file: case 0 branch expanded to parse arguments, lookup commands via manager, and execute them with error handling for unknown or invalid commands. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/pika_migrate_thread.cc (1)
667-692:⚠️ Potential issue | 🟠 MajorReturn failure when migrate thread startup fails before signaling success
At Line 670-Line 675, startup failure is logged, but execution still reaches Line 692 and returns
0. The caller interprets0as success and executes the wrapped command, even though migration may not be running.Proposed fix
if (!is_migrating_) { ResetThread(); int ret = StartThread(); if (0 != ret) { LOG(ERROR) << "PikaMigrateThread::ReqMigrateOne StartThread failed. " << " ret=" << ret; is_migrating_ = false; StopThread(); + return -1; } else { LOG(INFO) << "PikaMigrateThread::ReqMigrateOne StartThread"; is_migrating_ = true; usleep(100); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pika_migrate_thread.cc` around lines 667 - 692, In ReqMigrateOne, when StartThread() fails the code logs the error but continues and returns 0, which signals success; change the control flow so that after detecting a non-zero ret from StartThread() (and after calling StopThread() / setting is_migrating_ = false) the function returns that error code immediately instead of falling through to queueing and returning 0; ensure the early return occurs before any use of IsMigrating, mgrtone_queue_, NotifyRequestMigrate, or the final return 0 so callers receive a failure status when the migrate thread fails to start.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/pika_slot_command.cc`:
- Line 1443: The command wrapper now returns a new discriminator value 2
(res_.AppendInteger(2)) but the Do() contract and related tests still
document/expect only -1/0/1; update the contract comment above Do() to list and
describe the new result code 2 and modify any unit/integration tests that assert
allowed responses to include 2 so callers handle it explicitly (search for Do(),
res_.AppendInteger(2), and tests referencing -1/0/1 to locate all occurrences).
- Around line 1444-1460: The handler currently builds args and then directly
indexes args[0], gets a Cmd via g_pika_cmd_table_manager->GetCmd(opt) and runs
Initial()/Execute() without verifying the resolved target key; change the flow
to first ensure args is non-empty, then lookup and call c_ptr->Initial(args,
db_->GetDBName()), check c_ptr->res().ok(), then inspect c_ptr->current_key():
if current_key() is empty, indicates multi-key (or multi-key flag if available),
or current_key() != key_, reject by setting res_.SetRes(CmdRes::kErrOther,
"wrapped command targets wrong key") (or similar), and only call
c_ptr->Execute() when current_key() is single and equals key_; use the symbols
args, opt, c_ptr, Initial, res().ok(), current_key(), key_, and Execute to
locate and implement this guard.
---
Outside diff comments:
In `@src/pika_migrate_thread.cc`:
- Around line 667-692: In ReqMigrateOne, when StartThread() fails the code logs
the error but continues and returns 0, which signals success; change the control
flow so that after detecting a non-zero ret from StartThread() (and after
calling StopThread() / setting is_migrating_ = false) the function returns that
error code immediately instead of falling through to queueing and returning 0;
ensure the early return occurs before any use of IsMigrating, mgrtone_queue_,
NotifyRequestMigrate, or the final return 0 so callers receive a failure status
when the migrate thread fails to start.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7bdca7d7-17d3-4f3e-806a-5c721b266178
📒 Files selected for processing (2)
src/pika_migrate_thread.ccsrc/pika_slot_command.cc
| res_.AppendInteger(0); | ||
| res_.AppendInteger(0); | ||
| case 0: { | ||
| res_.AppendInteger(2); |
There was a problem hiding this comment.
Document the new 2 result code.
The contract immediately above Do() still says this wrapper only returns -1/0/1, but Line 1443 now emits 2. Please update the command contract/tests with the new discriminator so callers do not keep treating this path as an unknown response.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pika_slot_command.cc` at line 1443, The command wrapper now returns a new
discriminator value 2 (res_.AppendInteger(2)) but the Do() contract and related
tests still document/expect only -1/0/1; update the contract comment above Do()
to list and describe the new result code 2 and modify any unit/integration tests
that assert allowed responses to include 2 so callers handle it explicitly
(search for Do(), res_.AppendInteger(2), and tests referencing -1/0/1 to locate
all occurrences).
| PikaCmdArgsType args; | ||
| for (size_t i = 2; i < argv_.size(); ++i) { | ||
| args.push_back(argv_[i]); | ||
| } | ||
| std::string opt = args[0]; | ||
| pstd::StringToLower(opt); | ||
| std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt); | ||
| if (!c_ptr) { | ||
| res_.SetRes(CmdRes::kErrOther, "unknown command"); | ||
| return; | ||
| } | ||
| c_ptr->Initial(args, db_->GetDBName()); | ||
| if (!c_ptr->res().ok()) { | ||
| res_.SetRes(CmdRes::kErrOther, c_ptr->res().message()); | ||
| return; | ||
| } | ||
| c_ptr->Execute(); |
There was a problem hiding this comment.
Reject wrapped commands that do not resolve back to key_.
This branch executes client-supplied args verbatim, but never checks that the parsed command actually targets the wrapper key. A request like slotsmgrtexecwrapper foo set bar 1 will route on foo and mutate bar, and multi-key/admin commands can also slip through this path. During slot migration that breaks the single-slot guarantee and can send writes to the wrong shard. Validate args before indexing args[0], then reject anything whose current_key() is empty, multi-key, or different from key_ before Execute().
🔒 Suggested guardrails
PikaCmdArgsType args;
for (size_t i = 2; i < argv_.size(); ++i) {
args.push_back(argv_[i]);
}
+ if (args.empty()) {
+ res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsMgrtExecWrapper);
+ return;
+ }
std::string opt = args[0];
pstd::StringToLower(opt);
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
res_.SetRes(CmdRes::kErrOther, "unknown command");
return;
}
c_ptr->Initial(args, db_->GetDBName());
if (!c_ptr->res().ok()) {
res_.SetRes(CmdRes::kErrOther, c_ptr->res().message());
return;
}
+ const auto keys = c_ptr->current_key();
+ if (keys.size() != 1 || keys[0] != key_) {
+ res_.SetRes(CmdRes::kErrOther, "wrapped command must target the wrapper key");
+ return;
+ }
c_ptr->Execute();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pika_slot_command.cc` around lines 1444 - 1460, The handler currently
builds args and then directly indexes args[0], gets a Cmd via
g_pika_cmd_table_manager->GetCmd(opt) and runs Initial()/Execute() without
verifying the resolved target key; change the flow to first ensure args is
non-empty, then lookup and call c_ptr->Initial(args, db_->GetDBName()), check
c_ptr->res().ok(), then inspect c_ptr->current_key(): if current_key() is empty,
indicates multi-key (or multi-key flag if available), or current_key() != key_,
reject by setting res_.SetRes(CmdRes::kErrOther, "wrapped command targets wrong
key") (or similar), and only call c_ptr->Execute() when current_key() is single
and equals key_; use the symbols args, opt, c_ptr, Initial, res().ok(),
current_key(), key_, and Execute to locate and implement this guard.
#3218
Summary by CodeRabbit